在上一课时,我们详细介绍了 Dubbo 传统架构面临的挑战,以及 Dubbo 2.7.5 版本引入的服务自省方案是如何应对这些挑战的。
本课时我们将从服务自省方案的基础设施开始介绍其具体实现。我们首先会介绍元数据相关的基础类的定义,然后介绍元数据的上报以及元数据服务的相关内容,同时还会介绍 Service ID 与 Service Name 是如何映射的。
Service Instance 唯一标识一个服务实例,在 Dubbo 的源码中对应 ServiceInstance 接口,该接口的具体定义如下:
public interface ServiceInstance extends Serializable {
// 唯一标识
String getId();
// 获取当前ServiceInstance所属的Service Name
String getServiceName();
// 获取当前ServiceInstance的host
String getHost();
// 获取当前ServiceInstance的port
Integer getPort();
// 当前ServiceInstance的状态
default boolean isEnabled() {
return true;
}
// 检测当前ServiceInstance的状态
default boolean isHealthy() {
return true;
}
// 获取当前ServiceInstance关联的元数据,这些元数据以KV格式存储
Map<String, String> getMetadata();
// 计算当前ServiceInstance对象的hashCode值
int hashCode();
// 比较两个ServiceInstance对象
boolean equals(Object another);
}
DefaultServiceInstance 是 ServiceInstance 的唯一实现,DefaultServiceInstance 是一个普通的 POJO 类,其中的核心字段如下。
Dubbo 元数据服务与我们业务中发布的 Dubbo 服务无异,Consumer 端可以调用一个 ServiceInstance 的元数据服务获取其发布的全部服务的元数据。
说到元数据,就不得不提到 ServiceDefinition 这个类,它可以来描述一个服务接口的定义,其核心字段如下。
TypeBuilder 接口实现关系图
在服务发布的时候,会将服务的 URL 中的部分数据封装为 FullServiceDefinition 对象,然后作为元数据存储起来。FullServiceDefinition 继承了 ServiceDefinition,并在 ServiceDefinition 基础之上扩展了 params 集合(Map<String, String> 类型),用来存储 URL 上的参数。
接下来看 MetadataService 接口,在上一讲我们提到Dubbo 中的每个 ServiceInstance 都会发布 MetadataService 接口供 Consumer 端查询元数据,下图展示了 MetadataService 接口的继承关系:
MetadataService 接口继承关系图
在 MetadataService 接口中定义了查询当前 ServiceInstance 发布的元数据的相关方法,具体如下所示:
public interface MetadataService {
String serviceName(); // 获取当前ServiceInstance所属服务的名称
default String version() {
return VERSION; // 获取当前MetadataService接口的版本
}
// 获取当前ServiceInstance订阅的全部URL
default SortedSet<String> getSubscribedURLs(){
throw new UnsupportedOperationException("This operation is not supported for consumer.");
}
// 获取当前ServiceInstance发布的全部URL
default SortedSet<String> getExportedURLs() {
return getExportedURLs(ALL_SERVICE_INTERFACES);
}
// 根据服务接口查找当前ServiceInstance暴露的全部接口
default SortedSet<String> getExportedURLs(String serviceInterface) {
return getExportedURLs(serviceInterface, null);
}
// 根据服务接口和group两个条件查找当前ServiceInstance暴露的全部接口
default SortedSet<String> getExportedURLs(String serviceInterface, String group) {
return getExportedURLs(serviceInterface, group, null);
}
// 根据服务接口、group和version三个条件查找当前ServiceInstance暴露的全部接口
default SortedSet<String> getExportedURLs(String serviceInterface, String group, String version) {
return getExportedURLs(serviceInterface, group, version, null);
}
// 根据服务接口、group、version和protocol四个条件查找当前ServiceInstance暴露的全部接口
SortedSet<String> getExportedURLs(String serviceInterface, String group, String version, String protocol);
// 根据指定条件查询ServiceDefinition
String getServiceDefinition(String interfaceName, String version, String group);
String getServiceDefinition(String serviceKey);
}
在 MetadataService 接口中定义的都是查询元数据的方法,在其子接口 WritableMetadataService 中添加了一些发布元数据的写方法,具体定义如下:
@SPI(DEFAULT_METADATA_STORAGE_TYPE)
public interface WritableMetadataService extends MetadataService {
@Override
default String serviceName() {
// ServiceName默认是从ApplicationModel中获取
// ExtensionLoader、DubboBootstrap以及ApplicationModel是单个Dubbo进程范围内的单例对象,
// ExtensionLoader用于Dubbo SPI机制加载扩展实现,DubboBootstrap用于启动Dubbo进程,
// ApplicationModel用于表示一个Dubbo实例,其中维护了多个ProviderModel对象表示当前Dubbo实例发布的服务,
// 维护了多个ConsumerModel对象表示当前Dubbo实例引用的服务。
return ApplicationModel.getApplication();
}
boolean exportURL(URL url); // 发布该URL所代表的服务
boolean unexportURL(URL url); // 注销该URL所代表的服务
default boolean refreshMetadata(String exportedRevision, String subscribedRevision) {
return true; // 刷新元数据
}
boolean subscribeURL(URL url); // 订阅该URL所代表的服务
boolean unsubscribeURL(URL url); // 取消订阅该URL所代表的服务
// 发布Provider端的ServiceDefinition
void publishServiceDefinition(URL providerUrl);
// 获取WritableMetadataService的默认扩展实现
static WritableMetadataService getDefaultExtension() {
return getExtensionLoader(WritableMetadataService.class).getDefaultExtension();
}
// 获取WritableMetadataService接口指定的扩展实现(无指定扩展名称,则返回默认扩展实现)
static WritableMetadataService getExtension(String name) {
return getExtensionLoader(WritableMetadataService.class).getOrDefaultExtension(name);
}
}
WritableMetadataService 接口被 @SPI 注解修饰,是一个扩展接口,在前面的继承关系图中也可以看出,它有两个比较基础的扩展实现,分别是 InMemoryWritableMetadataService(默认扩展实现) 和 RemoteWritableMetadataServiceDelegate,对应扩展名分别是 local 和 remote。
下面我们先来看 InMemoryWritableMetadataService 的实现,其中维护了三个核心集合。
<URL>
> 类型):用于记录当前 ServiceInstance 发布的 URL
集合,其中 Key 是 ServiceKey(即 interface、group 和 version 三部分构成),Value 是对应的
URL 集合。
<URL>
> 类型):用于记录当前 ServiceInstance 引用的 URL
集合,其中 Key 是 ServiceKey(即 interface、group 和 version 三部分构成),Value 是对应的
URL 集合。
InMemoryWritableMetadataService 对 getExportedURLs()、getSubscribedURLs() 以及 getServiceDefinition() 方法的实现,就是查询上述三个集合的数据;对 (un)exportURL()、(un)subscribeURL() 和 publishServiceDefinition() 方法的实现,就是增删上述三个集合的数据。
(un)exportURL()、(un)subscribeURL() 等方法都是非常简单的集合操作,我们就不再展示,你若感兴趣的话可以参考源码进行学习。 这里我们重点来看一下 publishServiceDefinition() 方法对 ServiceDefinition 的处理:
public void publishServiceDefinition(URL providerUrl) {
// 获取服务接口
String interfaceName = providerUrl.getParameter(INTERFACE_KEY);
if (StringUtils.isNotEmpty(interfaceName)
&& !ProtocolUtils.isGeneric(providerUrl.getParameter(GENERIC_KEY))) {
Class interfaceClass = Class.forName(interfaceName);
// 创建服务接口对应的ServiceDefinition对象
ServiceDefinition serviceDefinition = ServiceDefinitionBuilder.build(interfaceClass);
Gson gson = new Gson();
// 将ServiceDefinition对象序列化为JSON对象
String data = gson.toJson(serviceDefinition);
// 将ServiceDefinition对象序列化之后的JSON字符串记录到serviceDefinitions集合
serviceDefinitions.put(providerUrl.getServiceKey(), data);
return;
}
}
在 RemoteWritableMetadataService 实现中封装了一个 InMemoryWritableMetadataService 对象,并对 publishServiceDefinition() 方法进行了覆盖,具体实现如下:
public void publishServiceDefinition(URL url) {
// 获取URL中的side参数值,决定调用publishProvider()还是publishConsumer()方法
String side = url.getParameter(SIDE_KEY);
if (PROVIDER_SIDE.equalsIgnoreCase(side)) {
publishProvider(url);
} else {
publishConsumer(url);
}
}
在 publishProvider() 方法中,首先会根据 Provider URL 创建对应的 FullServiceDefinition 对象,然后通过 MetadataReport 进行上报,具体实现如下:
private void publishProvider(URL providerUrl) throws RpcException {
// 删除pid、timestamp、bind.ip、bind.port等参数
providerUrl = providerUrl.removeParameters(PID_KEY, TIMESTAMP_KEY, Constants.BIND_IP_KEY,
Constants.BIND_PORT_KEY, TIMESTAMP_KEY);
// 获取服务接口名称
String interfaceName = providerUrl.getParameter(INTERFACE_KEY);
if (StringUtils.isNotEmpty(interfaceName)) {
Class interfaceClass = Class.forName(interfaceName); // 反射
// 创建服务接口对应的FullServiceDefinition对象,URL中的参数会记录到FullServiceDefinition的params集合中
FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(interfaceClass,
providerUrl.getParameters());
// 获取MetadataReport并上报FullServiceDefinition
getMetadataReport().storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(),
providerUrl.getParameter(VERSION_KEY), providerUrl.getParameter(GROUP_KEY),
PROVIDER_SIDE, providerUrl.getParameter(APPLICATION_KEY)), fullServiceDefinition);
return;
}
}
publishConsumer() 方法则相对比较简单:首先会清理 Consumer URL 中 pid、timestamp 等参数,然后将 Consumer URL 中的参数集合进行上报。
不过,在 RemoteWritableMetadataService 中的 exportURL()、subscribeURL()、getExportedURLs()、getServiceDefinition() 等一系列方法都是空实现,这是为什么呢?其实我们从 RemoteWritableMetadataServiceDelegate 中就可以找到答案,注意,RemoteWritableMetadataServiceDelegate 才是 MetadataService 接口的 remote 扩展实现。
在 RemoteWritableMetadataServiceDelegate 中同时维护了一个 InMemoryWritableMetadataService 对象和 RemoteWritableMetadataService 对象,exportURL()、subscribeURL() 等发布订阅相关的方法会同时委托给这两个 MetadataService 对象,getExportedURLs()、getServiceDefinition() 等查询方法则只会调用 InMemoryWritableMetadataService 对象进行查询。这里我们以 exportURL() 方法为例进行说明:
public boolean exportURL(URL url) {
return doFunction(WritableMetadataService::exportURL, url);
}
private boolean doFunction(BiFunction<WritableMetadataService, URL, Boolean> func, URL url) {
// 同时调用InMemoryWritableMetadataService对象和RemoteWritableMetadataService对象的exportURL()方法
return func.apply(defaultWritableMetadataService, url) && func.apply(remoteWritableMetadataService, url);
}
元数据中心是 Dubbo 2.7.0 版本之后新增的一项优化,其主要目的是将 URL 中的一部分内容存储到元数据中心,从而减少注册中心的压力。
元数据中心的数据只是给本端自己使用的,改动不需要告知对端,例如,Provider 修改了元数据,不需要实时通知 Consumer。这样,在注册中心存储的数据量减少的同时,还减少了因为配置修改导致的注册中心频繁通知监听者情况的发生,很好地减轻了注册中心的压力。
MetadataReport 接口是 Dubbo 节点与元数据中心交互的桥梁,其继承关系如下图所示:
MetadataReport 继承关系图
我们先来看一下 MetadataReport 接口的核心定义:
public interface MetadataReport {
// 存储Provider元数据
void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition);
// 存储Consumer元数据
void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap);
// 存储、删除Service元数据
void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url);
void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier);
// 查询暴露的URL
List<String> getExportedURLs(ServiceMetadataIdentifier metadataIdentifier);
// 查询订阅数据
void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set<String> urls);
List<String> getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier);
// 查询ServiceDefinition
String getServiceDefinition(MetadataIdentifier metadataIdentifier);
}
了解了 MetadataReport 接口定义的核心行为之后,接下来我们就按照其实现的顺序来介绍:先来分析 AbstractMetadataReport 抽象类提供的公共实现,然后以 ZookeeperMetadataReport 这个具体实现为例,介绍 MetadataReport 如何与 ZooKeeper 配合实现元数据上报。
AbstractMetadataReport 中提供了所有 MetadataReport 的公共实现,其核心字段如下:
private URL reportURL; // 元数据中心的URL,其中包含元数据中心的地址
// 本地磁盘缓存,用来缓存上报的元数据
File file;
final Properties properties = new Properties();
// 内存缓存
final Map<MetadataIdentifier, Object> allMetadataReports = new ConcurrentHashMap<>(4);
// 该线程池除了用来同步本地内存缓存与文件缓存,还会用来完成异步上报的功能
private final ExecutorService reportCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true));
// 用来暂存上报失败的元数据,后面会有定时任务进行重试
final Map<MetadataIdentifier, Object> failedReports = new ConcurrentHashMap<>(4);
boolean syncReport; // 是否同步上报元数据
// 记录最近一次元数据上报的版本,单调递增
private final AtomicLong lastCacheChanged = new AtomicLong();
// 用于重试的定时任务
public MetadataReportRetry metadataReportRetry;
// 当前MetadataReport实例是否已经初始化
private AtomicBoolean initialized = new AtomicBoolean(false);
在 AbstractMetadataReport 的构造方法中,首先会初始化本地的文件缓存,然后创建 MetadataReportRetry 重试任务,并启动一个周期性刷新的定时任务,具体实现如下:
public AbstractMetadataReport(URL reportServerURL) {
setUrl(reportServerURL);
// 默认的本地文件缓存
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-metadata-" + reportServerURL.getParameter(APPLICATION_KEY) + "-" + reportServerURL.getAddress().replaceAll(":", "-") + ".cache";
String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("...");
}
}
if (!initialized.getAndSet(true) && file.exists()) {
file.delete();
}
}
this.file = file;
// 将file文件中的内容加载到properties字段中
loadProperties();
// 是否同步上报元数据
syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false);
// 创建重试任务
metadataReportRetry = new MetadataReportRetry(reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
// 是否周期性地上报元数据
if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
// 默认每隔1天将本地元数据全部刷新到元数据中心
scheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
}
}
在 AbstractMetadataReport.storeProviderMetadata() 方法中,首先会根据 syncReport 字段值决定是同步上报还是异步上报:如果是同步上报,则在当前线程执行上报操作;如果是异步上报,则在 reportCacheExecutor 线程池中执行上报操作。具体的上报操作是在storeProviderMetadataTask() 方法中完成的:
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
try {
// 将元数据记录到allMetadataReports集合
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
// 如果之前上报失败,则在failedReports集合中有记录,这里上报成功之后会将其删除
failedReports.remove(providerMetadataIdentifier);
// 将元数据序列化成JSON字符串
Gson gson = new Gson();
String data = gson.toJson(serviceDefinition);
// 上报序列化后的元数据
doStoreProviderMetadata(providerMetadataIdentifier, data);
// 将序列化后的元数据保存到本地文件缓存中
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
} catch (Exception e) {
// 如果上报失败,则在failedReports集合中进行记录,然后由metadataReportRetry任务中进行重试
failedReports.put(providerMetadataIdentifier, serviceDefinition);
metadataReportRetry.startRetryTask();
}
}
我们可以看到这里调用了 doStoreProviderMetadata() 方法和 saveProperties() 方法。其中, doStoreProviderMetadata() 方法是一个抽象方法,对于不同的元数据中心实现有不同的实现,这个方法的具体实现在后面会展开分析。saveProperties() 方法中会更新 properties 字段,递增本地缓存文件的版本号,最后(同步/异步)执行 SaveProperties 任务,更新本地缓存文件的内容,具体实现如下:
private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
if (file == null) {
return;
}
if (add) { // 更新properties中的元数据
properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
} else {
properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
}
// 递增版本
long version = lastCacheChanged.incrementAndGet();
if (sync) { // 同步更新本地缓存文件
new SaveProperties(version).run();
} else { // 异步更新本地缓存文件
reportCacheExecutor.execute(new SaveProperties(version));
}
}
下面我们再来看 SaveProperties 任务的核心方法—— doSaveProperties() 方法,该方法中实现了刷新本地缓存文件的全部操作。
private void doSaveProperties(long version) {
if (version < lastCacheChanged.get()) { // 对比当前版本号和此次SaveProperties任务的版本号
return;
}
if (file == null) { // 检测本地缓存文件是否存在
return;
}
try {
// 创建lock文件
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock(); // 对lock文件加锁
if (lock == null) {
throw new IOException("Can not lock the metadataReport cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
}
try {
if (!file.exists()) { // 保证本地缓存文件存在
file.createNewFile();
}
// 将properties中的元数据保存到本地缓存文件中
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo metadataReport Cache");
}
} finally {
lock.release(); // 释放lock文件上的锁
}
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) { // 比较版本号
return;
} else { // 如果写文件失败,则重新提交SaveProperties任务,再次尝试
reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
}
}
了解了刷新本地缓存文件的核心逻辑之后,我们再来看 AbstractMetadataReport 中失败重试的逻辑。MetadataReportRetry 中维护了如下核心字段:
// 执行重试任务的线程池
ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true));
// 重试任务关联的Future对象
volatile ScheduledFuture retryScheduledFuture;
// 记录重试任务的次数
final AtomicInteger retryCounter = new AtomicInteger(0);
// 重试任务的时间间隔
long retryPeriod;
// 无失败上报的元数据之后,重试任务会再执行600次,才会销毁
int retryTimesIfNonFail = 600;
// 失败重试的次数上限,默认为100次,即重试失败100次之后会放弃
int retryLimit;
在 startRetryTask() 方法中,MetadataReportRetry 会创建一个重试任务,并提交到 retryExecutor 线程池中等待执行(如果已存在重试任务,则不会创建新任务)。在重试任务中会调用 AbstractMetadataReport.retry() 方法完成重新上报,当然也会判断 retryLimit 等执行条件,具体实现比较简单,这里就不再展示,你若感兴趣的话可以参考源码进行学习。
AbstractMetadataReport.retry() 方法的具体实现如下:
public boolean retry() {
return doHandleMetadataCollection(failedReports);
}
private boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
if (metadataMap.isEmpty()) { // 没有上报失败的元数据
return true;
}
// 遍历failedReports集合中失败上报的元数据,逐个调用storeProviderMetadata()方法或storeConsumerMetadata()方法重新上报
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable = metadataMap.entrySet().iterator();
while (iterable.hasNext()) {
Map.Entry<MetadataIdentifier, Object> item = iterable.next();
if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
} else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
}
}
return false;
}
在 AbstractMetadataReport 的构造方法中,会根据 reportServerURL(也就是后面的 metadataReportURL)参数启动一个“天”级别的定时任务,该定时任务会执行 publishAll() 方法,其中会通过 doHandleMetadataCollection() 方法将 allMetadataReports 集合中的全部元数据重新进行上报。该定时任务默认是在凌晨 02:00~06:00 启动,每天执行一次。
到此为止,AbstractMetadataReport 为子类实现的公共能力就介绍完了,其他方法都是委托给了相应的 do*() 方法,这些 do*() 方法都是在 AbstractMetadataReport 子类中实现的。
在 AbstractMetadataReport 上报元数据的时候,元数据对应的 Key 都是BaseMetadataIdentifier 类型的对象,其继承关系如下图所示:
BaseMetadataIdentifier 继承关系图
MetadataReportFactory 是用来创建 MetadataReport 实例的工厂,具体定义如下:
@SPI("redis")
public interface MetadataReportFactory {
@Adaptive({"protocol"})
MetadataReport getMetadataReport(URL url);
}
MetadataReportFactory 是个扩展接口,从 @SPI 注解的默认值可以看出Dubbo 默认使用 Redis 实现元数据中心。 Dubbo 提供了针对 ZooKeeper、Redis、Consul 等作为元数据中心的 MetadataReportFactory 实现,如下图所示:
MetadataReportFactory 继承关系图
这些 MetadataReportFactory 实现都继承了 AbstractMetadataReportFactory,在 AbstractMetadataReportFactory 提供了缓存 MetadataReport 实现的功能,并定义了一个 createMetadataReport() 抽象方法供子类实现。另外,AbstractMetadataReportFactory 实现了 MetadataReportFactory 接口的 getMetadataReport() 方法,下面我们就来简单看一下该方法的实现:
public MetadataReport getMetadataReport(URL url) {
// 清理export、refer参数
url = url.setPath(MetadataReport.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 从SERVICE_STORE_MAP集合(ConcurrentHashMap<String, MetadataReport>类型)中查询是否已经缓存有对应的MetadataReport对象
MetadataReport metadataReport = SERVICE_STORE_MAP.get(key);
if (metadataReport != null) { // 直接返回缓存的MetadataReport对象
return metadataReport;
}
// 创建新的MetadataReport对象,createMetadataReport()方法由子类具体实现
metadataReport = createMetadataReport(url);
// 将MetadataReport缓存到SERVICE_STORE_MAP集合中
SERVICE_STORE_MAP.put(key, metadataReport);
return metadataReport;
} finally {
LOCK.unlock();
}
}
MetadataReportInstance 是一个单例对象,其中会获取 MetadataReportFactory 的适配器,并根据 init() 方法传入的 metadataReportURL 选择对应的 MetadataReportFactory 创建 MetadataReport 实例,这也是当前 Dubbo 进程全局唯一的 MetadataReport 实例。
MetadataReportInstance 的具体实现比较简单,这里就不再展示,你若感兴趣的话可以参考源码进行学习。
下面我们来看 dubbo-metadata-report-zookeeper 模块是如何接入 ZooKeeper 作为元数据中心的。
我们首先关注 dubbo-metadata-report-zookeeper 模块的 SPI 文件,可以看到 ZookeeperMetadataReportFactory 的扩展名称是 zookeeper:
zookeeper=org.apache.dubbo.metadata.store.zookeeper.ZookeeperMetadataReportFactory
在 ZookeeperMetadataReportFactory 的 createMetadataReport() 方法中会创建 ZookeeperMetadataReport 这个 MetadataReport 实现类的对象。
在 ZookeeperMetadataReport 中维护了一个 ZookeeperClient 实例用来和 ZooKeeper 进行交互。ZookeeperMetadataReport 读写元数据的根目录是 metadataReportURL 的 group 参数值,默认值为 dubbo。
下面再来看 ZookeeperMetadataReport 对 AbstractMetadataReport 中各个 do*() 方法的实现,这些方法核心都是通过 ZookeeperClient 创建、查询、删除对应的 ZNode 节点,没有什么复杂的逻辑,关键是明确一下操作的 ZNode 节点的 path 是什么。
doStoreProviderMetadata() 方法和 doStoreConsumerMetadata() 方法会调用 storeMetadata() 创建相应的 ZNode 节点:
private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
zkClient.create(getNodePath(metadataIdentifier), v, false);
}
String getNodePath(BaseMetadataIdentifier metadataIdentifier) {
return toRootDir() + metadataIdentifier.getUniqueKey(KeyTypeEnum.PATH);
}
MetadataIdentifier 对象对应 ZNode 节点的 path 默认格式是 :
/dubbo/metadata/服务接口/version/group/side/application
对应 ZNode 节点的 Value 是 ServiceDefinition 序列化后的 JSON 字符串。
doSaveMetadata()、doRemoveMetadata() 以及 doGetExportedURLs() 方法参数是 ServiceMetadataIdentifier 对象,对应的 ZNode 节点 path 是:
/dubbo/metadata/服务接口/version/group/side/protocol/revision
doSaveSubscriberData()、doGetSubscribedURLs() 方法的参数是 SubscriberMetadataIdentifier 对象,对应的 ZNode 节点 path 是:
/dubbo/metadata/服务接口/version/group/side/revision
了解了 MetadataService 接口的核心功能和底层实现之后,我们接着再来看 MetadataServiceExporter 接口,这个接口负责将 MetadataService 接口作为一个 Dubbo 服务发布出去。
下面来看 MetadataServiceExporter 接口的具体定义:
public interface MetadataServiceExporter {
// 将MetadataService作为一个Dubbo服务发布出去
MetadataServiceExporter export();
// 注销掉MetadataService服务
MetadataServiceExporter unexport();
// MetadataService可能以多种协议发布,这里返回发布MetadataService服务的所有URL
List<URL> getExportedURLs();
// 检测MetadataService服务是否已经发布
boolean isExported();
}
MetadataServiceExporter 只有 ConfigurableMetadataServiceExporter 这一个实现,如下图所示:
MetadataServiceExporter 继承关系图
ConfigurableMetadataServiceExporter 的核心实现是 export() 方法,其中会创建一个 ServiceConfig 对象完成 MetadataService 服务的发布:
public ConfigurableMetadataServiceExporter export() {
if (!isExported()) {
// 创建ServiceConfig对象
ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
serviceConfig.setApplication(getApplicationConfig());
serviceConfig.setRegistries(getRegistries());
serviceConfig.setProtocol(generateMetadataProtocol()); // 设置Protocol(默认是Dubbo)
serviceConfig.setInterface(MetadataService.class); // 设置服务接口
serviceConfig.setRef(metadataService); // 设置MetadataService对象
serviceConfig.setGroup(getApplicationConfig().getName()); // 设置group
serviceConfig.setVersion(metadataService.version()); // 设置version
// 发布MetadataService服务,ServiceConfig发布服务的流程在前面已经详细分析过了,这里不再展开
serviceConfig.export();
this.serviceConfig = serviceConfig;
} else {
... // 输出日志
}
return this;
}
ServiceNameMapping 接口的主要功能是实现 Service ID 到 Service Name 之间的转换,底层会依赖配置中心实现数据存储和查询。ServiceNameMapping 接口的定义如下:
@SPI("default")
public interface ServiceNameMapping {
// 服务接口、group、version、protocol四部分构成了Service ID,并与当前Service Name之间形成映射,记录到配置中心
void map(String serviceInterface, String group, String version, String protocol);
// 根据服务接口、group、version、protocol四部分构成的Service ID,查询对应的Service Name
Set<String> get(String serviceInterface, String group, String version, String protocol);
// 获取默认的ServiceNameMapping接口的扩展实现
static ServiceNameMapping getDefaultExtension() {
return getExtensionLoader(ServiceNameMapping.class).getDefaultExtension();
}
}
DynamicConfigurationServiceNameMapping 是 ServiceNameMapping 的默认实现,也是唯一实现,其中会依赖 DynamicConfiguration 读写配置中心,完成 Service ID 和 Service Name 的映射。首先来看 DynamicConfigurationServiceNameMapping 的 map() 方法:
public void map(String serviceInterface, String group, String version, String protocol) {
// 跳过MetadataService接口的处理
if (IGNORED_SERVICE_INTERFACES.contains(serviceInterface)) {
return;
}
// 获取DynamicConfiguration对象
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 从ApplicationModel中获取Service Name
String key = getName();
String content = valueOf(System.currentTimeMillis());
execute(() -> {
// 在配置中心创建映射关系,这里的buildGroup()方法虽然接收四个参数,但是只使用了serviceInterface
// 也就是使用创建了服务接口到Service Name的映射
// 可以暂时将配置中心理解为一个KV存储,这里的Key是buildGroup()方法返回值+Service Name构成的,value是content(即时间戳)
dynamicConfiguration.publishConfig(key, buildGroup(serviceInterface, group, version, protocol), content);
});
}
在 DynamicConfigurationServiceNameMapping.get() 方法中,会根据传入的服务接口名称、group、version、protocol 组成 Service ID,查找对应的 Service Name,如下所示:
public Set<String> get(String serviceInterface, String group, String version, String protocol) {
// 获取DynamicConfiguration对象
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 根据Service ID从配置查找Service Name
Set<String> serviceNames = new LinkedHashSet<>();
execute(() -> {
Set<String> keys = dynamicConfiguration.getConfigKeys(buildGroup(serviceInterface, group, version, protocol));
serviceNames.addAll(keys);
});
// 返回查找到的全部Service Name
return Collections.unmodifiableSet(serviceNames);
}
本课时我们重点介绍了服务自省架构中元数据相关的实现。